Conversation
EngHabu
left a comment
There was a problem hiding this comment.
LGTM, @pingsutw @wild-endeavor should advise though
| def cluster_service(self) -> ClusterService: | ||
| return self._cluster_service | ||
|
|
||
| async def get_dataproxy_for_resource(self, operation: int, resource: object) -> DataProxyService: |
There was a problem hiding this comment.
Should this operation be an enum instead?
| # Build the SelectClusterRequest with the right oneof field | ||
| req = cluster_payload_pb2.SelectClusterRequest(operation=operation) | ||
| if hasattr(resource, "DESCRIPTOR"): | ||
| field_map = { | ||
| "OrgIdentifier": "org_id", | ||
| "ProjectIdentifier": "project_id", | ||
| "TaskIdentifier": "task_id", | ||
| "ActionIdentifier": "action_id", | ||
| "ActionAttemptIdentifier": "action_attempt_id", | ||
| } | ||
| field_name = field_map.get(type(resource).__name__) | ||
| if field_name: | ||
| getattr(req, field_name).CopyFrom(resource) |
There was a problem hiding this comment.
Is this the best way to create the one of 😬, @wild-endeavor @pingsutw ?
There was a problem hiding this comment.
we can do something like
req = SelectClusterRequest(operation=operation)
if hasattr(resource, "DESCRIPTOR"):
oneof = req.DESCRIPTOR.oneofs_by_name["resource"] # replace with actual oneof name
for field in oneof.fields:
if field.message_type is resource.DESCRIPTOR:
getattr(req, field.name).CopyFrom(resource)
break | getattr(req, field_name).CopyFrom(resource) | ||
|
|
||
| resp = await self._cluster_service.select_cluster(req) | ||
| cluster_endpoint = resp.cluster_endpoint |
There was a problem hiding this comment.
Are you going to normalize this here? stripping/adding http/s or dns:/// ?
There was a problem hiding this comment.
not necessary! create_session_config already calls normalize_rpc_endpoint
| if field_name: | ||
| getattr(req, field_name).CopyFrom(resource) | ||
|
|
||
| resp = await self._cluster_service.select_cluster(req) |
There was a problem hiding this comment.
Let's make sure we are throwing informative errors here in the infamous "no healthy clusters" error.. we now have a good place to catch that and raise a good error to the user
…luster-conns-for-dataproxy-svc
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
…luster-conns-for-dataproxy-svc
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
| return self._trigger_service | ||
|
|
||
| @property | ||
| def cluster_service(self) -> ClusterService: |
There was a problem hiding this comment.
Let's not expose this? Do we need it anywhere at all
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
| operation: cluster_payload_pb2.SelectClusterRequest.Operation, | ||
| project_id: identifier_pb2.ProjectIdentifier, | ||
| ) -> DataProxyService: | ||
| from flyte._logging import logger |
There was a problem hiding this comment.
this is not "co-routine" safe. This will lead to a race condition
Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
| ) | ||
| return await client.upload_inputs(request) | ||
|
|
||
| async def _resolve( |
There was a problem hiding this comment.
you should be just using alru_cache on this right?
| if existing is not None: | ||
| return await existing | ||
|
|
||
| loop = asyncio.get_running_loop() |
There was a problem hiding this comment.
why do we need this? Why do we need a future? If there is a client can we just not cache it? if you use alru_cache you can simply delete all of this code, just return the cache and let alru_cache handle all of this work for you
NOT TO BE MERGED UNTIL [BE](flyteorg/flyte#7184) CHANGES ARE IN. This PR adds a new dependency on the flyte remote client for dataproxy endpoints 1. Use a client conn cache for dataproxy operations, using 'operation' and 'resource' as cache keys 2. On cache misses call SelectCluster to get the respective cluster endpoint and initialize and cache a new client conn Going forward all new dataproxy calls should use this pattern. Today this PR updates the only uses of dataproxy in the sdk: UploadInputs and CreateUploadLocation --- _Testing_ Verified that the CreateRun path (which calls UploadInputs now) works with a local cluster ``` flyte -vvv --config .flyte/config-oss-local.yaml run -p flytesnacks -d development examples/basics/hello.py main ... ╭────────────────────────────────────────── Remote Run ───────────────────────────────────────────╮ │ Created Run: rmj7wwft78l69stlj69j │ │ URL: http://localhost:30080/v2/domain/development/project/flytesnacks/runs/rmj7wwft78l69stlj69j │ ╰─────────────────────────────────────────────────────────────────────────────────────────────────╯ ``` ref 26-353 --------- Signed-off-by: Katrina Rogan <katroganGH@gmail.com>
NOT TO BE MERGED UNTIL BE CHANGES ARE IN.
This PR adds a new dependency on the flyte remote client for dataproxy endpoints
Going forward all new dataproxy calls should use this pattern.
Today this PR updates the only uses of dataproxy in the sdk: UploadInputs and CreateUploadLocation
Testing
Verified that the CreateRun path (which calls UploadInputs now) works with a local cluster
ref 26-353